-
Notifications
You must be signed in to change notification settings - Fork 230
[wip] [draft] Support the single-controller mode #528
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…ocal-inf-engine
…ocal-inf-engine
…ocal-inf-engine
…AReaL into fw/local-inf-engine
…local-inf-engine
…ocal-inf-engine
…AReaL into fw/local-inf-engine
Summary of ChangesHello @garrett4wade, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a significant refactoring to support single-controller training, which provides centralized staleness control, infrastructure agnostic, optimized data communication, and enhanced scalability. It includes new scheduling features, a Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a major and well-executed refactoring to support a single-controller architecture for training. The changes significantly improve modularity, API clarity, and testability by introducing dedicated controllers, a new scheduler API, and a more robust RPC mechanism. The new LocalScheduler and the controller implementations are comprehensive. My feedback focuses on a few areas to enhance robustness and maintainability, such as improving the safety of subprocess creation, making data merging logic less heuristic, and refining the use of magic strings for control flow.
| if "attention_mask" in first_result: | ||
| return DistributedBatchMemory.concat( | ||
| [DistributedBatchMemory.from_dict(r) for r in results] | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic to decide how to merge dictionaries by checking for the presence of "attention_mask" is a bit heuristic and could be brittle. If a batch-like dictionary that needs padding doesn't happen to have this key, or a non-batch dictionary does, the merging logic will be incorrect.
A more robust approach would be to have a more explicit way to identify batch data that needs special concatenation. For example, the remote method could wrap batch data in a specific container class, or return metadata indicating the type of the result.
This would make the merging logic more reliable and less dependent on convention.
areal/engine/sglang_remote.py
Outdated
| def wait_quiet( | ||
| self, count: int, timeout: float | None = None | ||
| ) -> dict[str, Any] | None: | ||
| try: | ||
| return self._engine.wait(count, timeout=timeout) | ||
| except TimeoutError: | ||
| return "NO_RESULT" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The wait_quiet method's return type hint dict[str, Any] | None does not match its implementation, which can also return the string "NO_RESULT". This can lead to type checker errors and confusion.
Additionally, using a magic string like "NO_RESULT" to signal a timeout is not ideal. It makes the code harder to understand and maintain.
Consider one of these alternatives:
- Update the type hint to
dict[str, Any] | None | Literal["NO_RESULT"]to be accurate, as suggested below. - A better approach would be to avoid the magic string. Let this method re-raise the
TimeoutErroror a custom exception, and have the caller inRolloutControllerhandle it. This would make the control flow clearer. - If you want to avoid exceptions, return a sentinel object instead of a string, e.g.,
NO_RESULT = object(). This is safer than string comparisons.
| def wait_quiet( | |
| self, count: int, timeout: float | None = None | |
| ) -> dict[str, Any] | None: | |
| try: | |
| return self._engine.wait(count, timeout=timeout) | |
| except TimeoutError: | |
| return "NO_RESULT" | |
| def wait_quiet( | |
| self, count: int, timeout: float | None = None | |
| ) -> dict[str, Any] | None | str: | |
| try: | |
| return self._engine.wait(count, timeout=timeout) | |
| except TimeoutError: | |
| return "NO_RESULT" |
| process = subprocess.Popen( | ||
| cmd, | ||
| shell=isinstance(cmd, str), | ||
| stdout=sys.stdout, | ||
| stderr=sys.stdout, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of shell=True with subprocess.Popen (implicitly set because cmd is a string) can be a security risk if any part of the command string is derived from untrusted input. While the risk seems low in this context, it's best practice to avoid shell=True. The command string is constructed to use a pipe (| tee), which necessitates shell=True.
A safer approach would be to manage the output redirection in Python by reading from the subprocess's stdout/stderr and writing to both the log file and sys.stdout. This would allow you to pass the command as a list of arguments to Popen without shell=True.
For example, this could be done in a separate thread to avoid blocking:
# cmd_list should be a list of arguments, not a string
with open(log_file, "ab") as log_f:
process = subprocess.Popen(
cmd_list,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)
for line in iter(process.stdout.readline, b''):
sys.stdout.buffer.write(line)
sys.stdout.flush()
log_f.write(line)This is more complex but avoids the risks of shell injection and makes stream handling more explicit.
Description
This PR introduces a significant refactoring to support single-controller training, which provides the following benefits:
Current Limitations (Proof of Concept)
This implementation is a proof of concept with the following known limitations:
Comparison to Previous Approaches
This PR differs from #410, #415, and #489 primarily in its approach to RPC security and request scheduling during rollout. It provides a cleaner, more efficient implementation with minimal changes to existing APIs.
Roadmap
Over the next few weeks, we will break down these changes into smaller, focused PRs and merge them incrementally into the main branch.
Related Issue
Fixes #260
Type of Change
Checklist
jb build docs/gemini review)Additional Context
Discussion about implementing
RolloutController: #469